-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade Kafka read/write transforms without upgrading the pipeline #29362
Upgrade Kafka read/write transforms without upgrading the pipeline #29362
Conversation
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @bvolpato for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
"EqualsIncompatibleType", | ||
}) | ||
public static @Nullable String findUpgradeURN( | ||
org.apache.beam.sdk.transforms.PTransform transform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we fix this import?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
? extends TransformPayloadTranslator> | ||
entry : registrar.getTransformPayloadTranslators().entrySet()) { | ||
if (entry.getKey().equals(transform.getClass())) { | ||
return entry.getValue().getUrn(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we care if there are multiple potential matches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract is to upgrade all transforms that match the URN (and TransformUpgrader already handles this).
@@ -0,0 +1,578 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class looks like it contains a bunch of relatively boilerplate code. Given that we would want translation and upgrades for all pre-built composites eventually, is there any way to try and avoid this boilerplate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So all we do here is building a Row object from the transform object and vice versa using utilities available in the Beam Schema library. The rest of the boilerplate here is mostly error/null checks which cannot be refactored away easily. We do have fromByteArray/toByteArray methods here to convert objects that do not have a Schema defined (for example, user specified functions) but these are pretty small and different classes can choose to do the serialization in different ways.
Do you have a specific refactoring in mind to simplify this code ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't, good to know this was thought through
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks.
? extends TransformPayloadTranslator> | ||
entry : registrar.getTransformPayloadTranslators().entrySet()) { | ||
if (entry.getKey().equals(transform.getClass())) { | ||
return entry.getValue().getUrn(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contract is to upgrade all transforms that match the URN (and TransformUpgrader already handles this).
"EqualsIncompatibleType", | ||
}) | ||
public static @Nullable String findUpgradeURN( | ||
org.apache.beam.sdk.transforms.PTransform transform) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
@@ -0,0 +1,578 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So all we do here is building a Row object from the transform object and vice versa using utilities available in the Beam Schema library. The rest of the boilerplate here is mostly error/null checks which cannot be refactored away easily. We do have fromByteArray/toByteArray methods here to convert objects that do not have a Schema defined (for example, user specified functions) but these are pretty small and different classes can choose to do the serialization in different ways.
Do you have a specific refactoring in mind to simplify this code ?
1069662
to
85288cf
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #29362 +/- ##
==========================================
+ Coverage 38.32% 38.34% +0.02%
==========================================
Files 694 693 -1
Lines 102373 102227 -146
==========================================
- Hits 39231 39200 -31
+ Misses 61549 61436 -113
+ Partials 1593 1591 -2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
85288cf
to
aff97ef
Compare
PTAL |
Run Java PreCommit |
Thanks. |
This allows pipeline authors to upgrade Beam Kafka read/write transforms to a newer Beam version while keeping the rest of the pipeline in an older Beam version.
This will be useful for pipeline authors who wish to use newer releases of Kafka read/write transforms due to bug fixes, perf. improvements etc. but are unable to upgrade the full pipeline (for example, due to organizational restrictions).
Issue is #27943
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.